package Watermark

import Source.SourceTest.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// watermark水位线
object Water {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 1.设置事件时间语义eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(50)


    //    val inputPath = "E:\\Flinklearn\\Flink\\resources\\data1\\out.txt" "192.168.1.110","master"
    val inputStream: DataStream[String] = env.socketTextStream("192.168.1.110",7777)

    // 转换成样例类类型
    val ds1: DataStream[SensorReading] = inputStream.map(
      data => {
        val fields = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })
      // 升序数据提取时间戳 用于没有乱序的数据
      // .assignAscendingTimestamps(_.timestamp*1000L)
      // 从当前数据里提取时间戳并且制定watermark的生成方式
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L
      })

    val latetag = new OutputTag[(String, Double, Long)]("late")

    val ds2: DataStream[(String, Double, Long)] = ds1.map(data => (data.id, data.temperature, data.timestamp))
      .keyBy(_._1)
      .timeWindow(Time.seconds(15))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(latetag)
      .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3))

    ds2.getSideOutput(latetag).print("late")
    ds2.print("result")


    env.execute()
  }

}
